-
Notifications
You must be signed in to change notification settings - Fork 300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Preload (FCI) filehandlers for eager processing #2686
base: main
Are you sure you want to change the base?
Conversation
First beginning of work to preload filehandlers before files are present. Not much implementation yet, just a skeleton on what it might look like in the YAMLReader.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2686 +/- ##
==========================================
+ Coverage 96.05% 96.08% +0.02%
==========================================
Files 370 370
Lines 54320 54861 +541
==========================================
+ Hits 52177 52713 +536
- Misses 2143 2148 +5
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Pull Request Test Coverage Report for Build 10528467872Details
💛 - Coveralls |
Prepare GeoSegmentYAMLReader for preloading filehandlers for files corresponding to segments that don't exist yet. Early draft implementation that appears to work with limitations. Implementation in GEOSegmentYAMLReader still needs tweaking, tests need to be improved, and the corresponding file handler (for now just FCI) needs to be able to handle it.
Continue the pre-loading implementation in the GEOSegmentedYAMLReader. Add unit tests.
Don't raise an error that we can't predict the remaining files if this functionality was not requested.
Add a Preloadable class to netcdf_utils. This so far implements pre-loading filehandlers for to-be-expected files if a single one already exists, taking a defined set of data variables and their attributes from the first segment. Still to be implemented is to take other information from other repeat cycles, by on-disk caching.
Cache data variables between repeat cycles.
Continue working on repeat-cycle caching.
The good news is that my test script now passes without errors. The bad news is that the resulting image has no data / is all black. |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side request as mentioned on slack: Move the "does NetCDF file exist" functionality to its own function that is Delayed. Hold on to the result of this in your file handler(s). Pass the result of that (the filename I think) to the _get_delayed_value
function which is also delayed, and then go on as you are. This way the globbing and sleeping only has to happen once.
There is something wrong with the values and the calibration. For one test case, for one pixel, loading |
When preloading, unset automaskandscale.
The bug with the wrong values was because automaskandscale was being applied automatically by the NetCDF library. After unsetting this, the values seem to match with the reference. This image was created while scene creation, loading, and creating the resampling graph happening before accessing any data: Processing took 28.4 seconds, took 1.89 GB RAM, and used 121% CPU. Classical processing took 19.0 seconds, took 2.03 GB RAM, and used 73% CPU. Now that it's functional, I will work on adding tests and improving the implementation. |
Very nice! How long did it take after the last segment had "arrived"? |
Split the delayed loading from file into one delayed function that waits for the file and another that loads a variable from the file.
Add tests for the functionality to wait for a file to appear in a delayed object.
Add a test method to test getting a delayed value from a (delayed) file
PRogress un nit tests fol the puproses of delayed olading
I haven't done a full test including a realistic simulation of delayed segment arrival yet. But when I skip computing, it takes 19 seconds with 421 MB RAM. |
Add more tests for the preloadable mixin in netcdf_utils. Cleanup some unused code and add checks/guards in this mixin.
Improve tests for the yaml reader in case of preloading file handlers. Tests fail.
Improve the tests for the YAML reader and the NetCDF utils. Tolerate absence of time_tags and variable_tags for preloadables. Verify presence of required_netcdf_filehandlers on creation rather than on caching time.
Using datetime.min for an artificial date leads to different strftime results between platforms (see https://bugs.python.org/msg307401). Use datetime.max instead.
Simplify _new_filehandler_instances. Hopefully this will satisfy codescene. Replace "/dev/null" by os.devnull for cross-platform support.
In test waiting for file to appear, wait longer. Maybe this will fix the "file not found" on 3.11 problem.
Fixing three merge conflicts.
When caching, make sure we use the CachingFileManager already upon scene creation and not only by the time we are loading.
Don't subclass netCDF4.Dataset, rather just return an instance from a helper function. Seems good enough and gets rid of the weird error messages upon exit.
Some readers read entire groups; this needs xarray kwargs to be set even if caching is used.
solving one merge conflict
Guard against file handle caching with the xarray manager when preloading. We can't cache very well when data are not there yet.
I get mixed results with |
Creating the dask distributed client makes subsequent scene loading extremely slow. The test script (intended to simulate what trollflow2 does): import hdf5plugin
import satpy
satpy.config.set({"readers.preload_segments": True})
satpy.config.set({"readers.preload_dask_distributed": True})
from satpy import Scene
from satpy.writers import compute_writer_results
from dask.distributed import Client
import dask.config
import time
fci_file = "/media/nas/x21308/scratch/FCI/202312201500b/W_XX-EUMETSAT-Darmstadt,IMG+SAT,MTI1+FCI-1C-RRAD-FDHSI-FD--CHK-BODY--DIS-NC4E_C_EUMT_20231220150628_IDPFI_OPE_20231220150007_20231220150017_N_JLS_C_0091_0001.nc"
res500 = ["vis_06"]
res1000 = ["vis_08", "vis_08", "nir_16", "ir_38", "ir_105"]
res2000 = ["wv_63", "wv_63", "wv_73", "ir_87", "ir_97", "ir_123", "ir_133", "airmass", "convection", "dust", "ash"]
def main():
cl = Client()
t1 = time.time()
sc = Scene(filenames={"fci_l1c_nc": [fci_file]})
#sc = Scene(filenames={"fci_l1c_nc": fci_files})
t2 = time.time()
sc.load(res500 + res1000 + res2000, generate=False)
t3 = time.time()
ls1 = sc.resample("nqceur500m", resampler="gradient_search")
ls2 = sc.resample("nqceur1km", resampler="gradient_search")
ls3 = sc.resample("nqceur2km", resampler="gradient_search")
t4 = time.time()
comps = []
for c in res500:
comps.append(ls1.save_dataset(c, filename=f"{c:s}.png",
compute=False, writer="simple_image"))
for c in res1000:
comps.append(ls2.save_dataset(c, filename=f"{c:s}.png",
compute=False, writer="simple_image"))
for c in res2000:
comps.append(ls3.save_dataset(c, filename=f"{c:s}.png",
compute=False, writer="simple_image"))
t5 = time.time()
print("Scene creation", t2-t1)
print("Loading", t3-t2)
print("Resampling", t4-t3)
print("Saving (without computing)", t5-t4)
if __name__ == "__main__":
main() Testing with aca09ff, without Without
With
No such difference is observed with satpy main or with #2822 (which I merged into this one). According to |
Correction: according to |
…ndler When segment-shareable data are obtained from the reference filehandler, get this directly from its ``file_content`` attribute and not via the filehandlers ``__getitem__`` method. This solves the performance problem using preloading with dask distributed. Details: ``NetCDF4FileHandler.__getitem__`` turns a netCDF4.Variable into an xarray.DataArray encapsulating a dask array. Therefore, a small read, such as the FCI L1C NC file handler needs to perform upon ``Scene.load``, becomes a small compute. Those small computes have a small negative impact when using the normal dask scheduler, but a terrible negative impact when using the dask distributed scheduler, where in one case I tested, ``Scene.load`` increases from 10 seconds to 375 seconds. Time for ``Scene.load`` with preloaded segments with the regular scheduler, before this commit: 10.8 seconds Time for ``Scene.load`` with preloaded segments with the distributed scheduler, before this commit: 376 seconds Time for ``Scene.load`` with preloaded segments with the regular scheduler, after this commit: 6.5 seconds Time for ``Scene.load`` with preloaded segments with the distributed scheduler, after this commit: 6.9 seconds
Solved the dask distributed performance problem in ec49a6f:
|
So the item in |
Yes. That predates this PR.
Not as part of the preload process, but in the FCI file handler for either regular loading or pre-loading. For example, it loads the satellite location and the Sun-Earth distance: satpy/satpy/readers/fci_l1c_nc.py Lines 428 to 436 in a5c5022
Those are parameters that are constant between segments, so when we are pre-loading segments 2–40 (by pre-loading I mean the FileHandler handling a file that is expected, but not yet available), those parameters are taken from segment 1. When passed a |
Ah ok and your change was specifically for things being loaded from the other "ref" segment. Ok. And these things being loaded are stored as variables in the NetCDF files and not attributes? |
Yes and yes. |
I do not manage to get the task ordering to work reliably with dask distributed using the secede/rejoin approach proposed by @mraspaud on 2024-06-12 in a situation with a realistic complexity. Even with seceding/rejoining, it seems hard to control when dask finally reaches tasks depending on early chunks. A case study, based on debug statements, shows an example with 8 workers:
Results appear differently in detail on a different machine or even depending on how I run (in a satpy test script, through trollflow2s satpy_cli, via trollflow2 messaging). So even with the secede/rejoin complications, we are idling for 48 seconds before calculations start, even though by then 7 chunks are already available. Although the tasks are doing plenty of secede/rejoin, they are just jumping back and forth between tasks depending on ten chunks that aren't there yet, then two more after 44/45 seconds. Only after 62 seconds have the workers alternated between tasks that have dependencies on all possible chunks. I don't know if dask is ping-ponging between the same tasks (that are immediately seceding) and letting others lie, or if it's actually going by all tasks, and we have so many of them that it takes 64 seconds to reach the last chunk, in an order we still do not control. Based on this and other problems I encounter with dask distributed, and the fact it would take significant development work on satpy to make it suit our needs (writing geotiff), I am inclined to give up on full dask distributed compatibility. |
Move preloading documentations to its own section under "Advanced topics". Fix internal reference links.
Rename the preloading configuration parameters to use their own subsection.
Merge three very similar test functions to a single parametrised one.
solving one merge conflict
Thought by @mraspaud on another way to possibly resolve the ordering: https://pytroll.slack.com/archives/C0LNH7LMB/p1723793710070629 Or explicitly make tasks depend on other tasks, not sure how. |
Add functionality to preload filehandlers for files that have not yet arrived on disk. This works by passing a single file (the first chunk) and then it will generate glob patterns for all remaining chunks.
It seems to work for loading only FDHSI data.
Scene.load
call, including for preloaded segments.Deferred: support for mixing FDHSI and HRFI. This PR is ambitious enough as it is, and I would like to postpone that complication to a later PR.
As of 2024-06-20, it works with some limitations.
The default scheduler offers no control of the order of tasks. The result is that dask tasks waiting for segments 33–40 might be scheduled first, while segments 2–32 are coming in but the corresponding tasks being later in the queue. With the dask.distributed scheduler we can avoid this problem, but the dask distributed scheduler has limited support in Satpy. PR #2822 makes the FCI reader work with dask.distributed, but nearest neighbour resampling or the GeoTIFF writer still fail (see #1762). Therefore, to use eager processing, the user either: